<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="utf-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
    <title>Apache Flink: Accessing Data Stored in MongoDB with Stratosphere</title>
    <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
    <link rel="icon" href="/favicon.ico" type="image/x-icon">

    <!-- Bootstrap -->
    <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
    <link rel="stylesheet" href="/css/flink.css">
    <link rel="stylesheet" href="/css/syntax.css">

    <!-- Blog RSS feed -->
    <link href="/blog/feed.xml" rel="alternate" type="application/rss+xml" title="Apache Flink Blog: RSS feed" />

    <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
    <!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
    <!--[if lt IE 9]>
      <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
      <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
    <![endif]-->
  </head>
  <body>  
    

  <!-- Top navbar. -->
    <nav class="navbar navbar-default navbar-fixed-top">
      <div class="container">
        <!-- The logo. -->
        <div class="navbar-header">
          <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
            <span class="icon-bar"></span>
            <span class="icon-bar"></span>
            <span class="icon-bar"></span>
          </button>
          <div class="navbar-logo">
            <a href="/"><img alt="Apache Flink" src="/img/navbar-brand-logo.jpg" width="78px" height="40px"></a>
          </div>
        </div><!-- /.navbar-header -->

        <!-- The navigation links. -->
        <div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
          <ul class="nav navbar-nav">
            <!-- Overview -->
            <li><a href="/index.html">Overview</a></li>

            <!-- Quickstart -->
            <li class="dropdown">
              <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Quickstart <span class="caret"></span></a>
              <ul class="dropdown-menu" role="menu">
                <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.9/quickstart/setup_quickstart.html">Setup</a></li>
                <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.9/quickstart/java_api_quickstart.html">Java API</a></li>
                <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.9/quickstart/scala_api_quickstart.html">Scala API</a></li>
                <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.9/quickstart/run_example_quickstart.html">Run Step-by-Step Example</a></li>
              </ul>
            </li>

            <!-- Features -->
            <li><a href="/features.html">Features</a></li>

            <!-- Downloads -->
            <li><a href="/downloads.html">Downloads</a></li>

            <!-- Documentation -->
            <li class="dropdown">
              <a href="" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Documentation <span class="caret"></span></a>
              <ul class="dropdown-menu" role="menu">
                <!-- Latest stable release -->
                <li role="presentation" class="dropdown-header"><strong>Latest Release</strong> (Stable)</li>
                <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.9">0.9.0 Documentation</a></li>
                <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.9/api/java" class="active">0.9.0 Javadocs</a></li>
                <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.9/api/scala/index.html" class="active">0.9.0 ScalaDocs</a></li>

                <!-- Snapshot docs -->
                <li class="divider"></li>
                <li role="presentation" class="dropdown-header"><strong>Snapshot</strong> (Development)</li>
                <li><a href="http://ci.apache.org/projects/flink/flink-docs-master">0.10 Documentation</a></li>
                <li><a href="http://ci.apache.org/projects/flink/flink-docs-master/api/java" class="active">0.10 Javadocs</a></li>
                <li><a href="http://ci.apache.org/projects/flink/flink-docs-master/api/scala/index.html" class="active">0.10 ScalaDocs</a></li>

                <!-- Wiki -->
                <li class="divider"></li>
                <li><a href="https://cwiki.apache.org/confluence/display/FLINK/Apache+Flink+Home"><small><span class="glyphicon glyphicon-new-window"></span></small> Wiki</a></li>
              </ul>
            </li>

            <!-- FAQ -->
            <li><a href="/faq.html">FAQ</a></li>
          </ul>

          <ul class="nav navbar-nav navbar-right">
            <!-- Blog -->
            <li class=" active hidden-md hidden-sm"><a href="/blog/">Blog</a></li>

            <li class="dropdown hidden-md hidden-sm">
              <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Community <span class="caret"></span></a>
              <ul class="dropdown-menu" role="menu">
                <!-- Community -->
                <li role="presentation" class="dropdown-header"><strong>Community</strong></li>
                <li><a href="/community.html#mailing-lists">Mailing Lists</a></li>
                <li><a href="/community.html#irc">IRC</a></li>
                <li><a href="/community.html#stack-overflow">Stack Overflow</a></li>
                <li><a href="/community.html#issue-tracker">Issue Tracker</a></li>
                <li><a href="/community.html#source-code">Source Code</a></li>
                <li><a href="/community.html#people">People</a></li>

                <!-- Contribute -->
                <li class="divider"></li>
                <li role="presentation" class="dropdown-header"><strong>Contribute</strong></li>
                <li><a href="/how-to-contribute.html">How to Contribute</a></li>
                <li><a href="/coding-guidelines.html">Coding Guidelines</a></li>
              </ul>
            </li>

            <li class="dropdown hidden-md hidden-sm">
              <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Project <span class="caret"></span></a>
              <ul class="dropdown-menu" role="menu">
                <!-- Project -->
                <li role="presentation" class="dropdown-header"><strong>Project</strong></li>
                <li><a href="/material.html">Material</a></li>
                <li><a href="https://twitter.com/apacheflink"><small><span class="glyphicon glyphicon-new-window"></span></small> Twitter</a></li>
                <li><a href="https://github.com/apache/flink"><small><span class="glyphicon glyphicon-new-window"></span></small> GitHub</a></li>
                <li><a href="https://cwiki.apache.org/confluence/display/FLINK/Apache+Flink+Home"><small><span class="glyphicon glyphicon-new-window"></span></small> Wiki</a></li>
              </ul>
            </li>
          </ul>
        </div><!-- /.navbar-collapse -->
      </div><!-- /.container -->
    </nav>


    <!-- Main content. -->
    <div class="container">
      

<div class="row">
  <div class="col-sm-8 col-sm-offset-2">
    <div class="row">
      <h1>Accessing Data Stored in MongoDB with Stratosphere</h1>

      <article>
        <p>28 Jan 2014 by Robert Metzger (<a href="https://twitter.com/rmetzger_">@rmetzger_</a>)</p>

<p>We recently merged a <a href="https://github.com/stratosphere/stratosphere/pull/437">pull request</a> that allows you to use any existing Hadoop <a href="http://developer.yahoo.com/hadoop/tutorial/module5.html#inputformat">InputFormat</a> with Stratosphere. So you can now (in the <code>0.5-SNAPSHOT</code> and upwards versions) define a Hadoop-based data source:</p>

<div class="highlight"><pre><code class="language-java"><span class="n">HadoopDataSource</span> <span class="n">source</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">HadoopDataSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">TextInputFormat</span><span class="o">(),</span> <span class="k">new</span> <span class="nf">JobConf</span><span class="o">(),</span> <span class="s">&quot;Input Lines&quot;</span><span class="o">);</span>
<span class="n">TextInputFormat</span><span class="o">.</span><span class="na">addInputPath</span><span class="o">(</span><span class="n">source</span><span class="o">.</span><span class="na">getJobConf</span><span class="o">(),</span> <span class="k">new</span> <span class="nf">Path</span><span class="o">(</span><span class="n">dataInput</span><span class="o">));</span></code></pre></div>

<p>We describe in the following article how to access data stored in <a href="http://www.mongodb.org/">MongoDB</a> with Stratosphere. This allows users to join data from multiple sources (e.g. MonogDB and HDFS) or perform machine learning with the documents stored in MongoDB.</p>

<p>The approach here is to use the <code>MongoInputFormat</code> that was developed for Apache Hadoop but now also runs with Stratosphere.</p>

<div class="highlight"><pre><code class="language-java"><span class="n">JobConf</span> <span class="n">conf</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">JobConf</span><span class="o">();</span>
<span class="n">conf</span><span class="o">.</span><span class="na">set</span><span class="o">(</span><span class="s">&quot;mongo.input.uri&quot;</span><span class="o">,</span><span class="s">&quot;mongodb://localhost:27017/enron_mail.messages&quot;</span><span class="o">);</span>
<span class="n">HadoopDataSource</span> <span class="n">src</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">HadoopDataSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">MongoInputFormat</span><span class="o">(),</span> <span class="n">conf</span><span class="o">,</span> <span class="s">&quot;Read from Mongodb&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="nf">WritableWrapperConverter</span><span class="o">());</span></code></pre></div>

<h3 id="example-program">Example Program</h3>
<p>The example program reads data from the <a href="http://www.cs.cmu.edu/~enron/">enron dataset</a> that contains about 500k internal e-mails. The data is stored in MongoDB and the Stratosphere program counts the number of e-mails per day.</p>

<p>The complete code of this sample program is available on <a href="https://github.com/stratosphere/stratosphere-mongodb-example">GitHub</a>.</p>

<h4 id="prepare-mongodb-and-the-data">Prepare MongoDB and the Data</h4>

<ul>
  <li>Install MongoDB</li>
  <li>Download the enron dataset from <a href="http://mongodb-enron-email.s3-website-us-east-1.amazonaws.com/">their website</a>.</li>
  <li>Unpack and load it</li>
</ul>

<p><code>bash
 bunzip2 enron_mongo.tar.bz2
 tar xvf enron_mongo.tar
 mongorestore dump/enron_mail/messages.bson
</code></p>

<p>We used <a href="http://robomongo.org/">Robomongo</a> to visually examine the dataset stored in MongoDB.</p>

<p><img src="/img/blog/robomongo.png" style="width:90%;margin:15px" /></p>

<h4 id="build-mongoinputformat">Build <code>MongoInputFormat</code></h4>

<p>MongoDB offers an InputFormat for Hadoop on their <a href="https://github.com/mongodb/mongo-hadoop">GitHub page</a>. The code is not available in any Maven repository, so we have to build the jar file on our own.</p>

<ul>
  <li>Check out the repository</li>
</ul>

<div class="highlight"><pre><code>git clone https://github.com/mongodb/mongo-hadoop.git
cd mongo-hadoop
</code></pre></div>

<ul>
  <li>Set the appropriate Hadoop version in the <code>build.sbt</code>, we used <code>1.1</code>.</li>
</ul>

<div class="highlight"><pre><code class="language-bash">hadoopRelease in ThisBuild :<span class="o">=</span> <span class="s2">&quot;1.1&quot;</span></code></pre></div>
<ul>
  <li>Build the input format</li>
</ul>

<div class="highlight"><pre><code class="language-bash">./sbt package</code></pre></div>

<p>The jar-file is now located in <code>core/target</code>.</p>

<h4 id="the-stratosphere-program">The Stratosphere Program</h4>

<p>Now we have everything prepared to run the Stratosphere program. I only ran it on my local computer, out of Eclipse. To do that, check out the code …</p>

<div class="highlight"><pre><code class="language-bash">git clone https://github.com/stratosphere/stratosphere-mongodb-example.git</code></pre></div>

<p>… and import it as a Maven project into your Eclipse. You have to manually add the previously built mongo-hadoop jar-file as a dependency.
You can now press the “Run” button and see how Stratosphere executes the little program. It was running for about 8 seconds on the 1.5 GB dataset.</p>

<p>The result (located in <code>/tmp/enronCountByDay</code>) now looks like this.</p>

<div class="highlight"><pre><code>11,Fri Sep 26 10:00:00 CEST 1997
154,Tue Jun 29 10:56:00 CEST 1999
292,Tue Aug 10 12:11:00 CEST 1999
185,Thu Aug 12 18:35:00 CEST 1999
26,Fri Mar 19 12:33:00 CET 1999
</code></pre></div>

<p>There is one thing left I want to point out here. MongoDB represents objects stored in the database as JSON-documents. Since Stratosphere’s standard types do not support JSON documents, I was using the <code>WritableWrapper</code> here. This wrapper allows to use any Hadoop datatype with Stratosphere.</p>

<p>The following code example shows how the JSON-documents are accessed in Stratosphere.</p>

<div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kt">void</span> <span class="nf">map</span><span class="o">(</span><span class="n">Record</span> <span class="n">record</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Record</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
	<span class="n">Writable</span> <span class="n">valWr</span> <span class="o">=</span> <span class="n">record</span><span class="o">.</span><span class="na">getField</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">WritableWrapper</span><span class="o">.</span><span class="na">class</span><span class="o">).</span><span class="na">value</span><span class="o">();</span>
	<span class="n">BSONWritable</span> <span class="n">value</span> <span class="o">=</span> <span class="o">(</span><span class="n">BSONWritable</span><span class="o">)</span> <span class="n">valWr</span><span class="o">;</span>
	<span class="n">Object</span> <span class="n">headers</span> <span class="o">=</span> <span class="n">value</span><span class="o">.</span><span class="na">getDoc</span><span class="o">().</span><span class="na">get</span><span class="o">(</span><span class="s">&quot;headers&quot;</span><span class="o">);</span>
	<span class="n">BasicDBObject</span> <span class="n">headerOb</span> <span class="o">=</span> <span class="o">(</span><span class="n">BasicDBObject</span><span class="o">)</span> <span class="n">headers</span><span class="o">;</span>
	<span class="n">String</span> <span class="n">date</span> <span class="o">=</span> <span class="o">(</span><span class="n">String</span><span class="o">)</span> <span class="n">headerOb</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">&quot;Date&quot;</span><span class="o">);</span>
	<span class="c1">// further date processing</span>
<span class="o">}</span></code></pre></div>

<p>Please use the comments if you have questions or if you want to showcase your own MongoDB-Stratosphere integration.</p>

      </article>
    </div>

    <div class="row">
      <div id="disqus_thread"></div>
      <script type="text/javascript">
        /* * * CONFIGURATION VARIABLES: EDIT BEFORE PASTING INTO YOUR WEBPAGE * * */
        var disqus_shortname = 'stratosphere-eu'; // required: replace example with your forum shortname

        /* * * DON'T EDIT BELOW THIS LINE * * */
        (function() {
            var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
            dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
             (document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
        })();
      </script>
    </div>
  </div>
</div>

      <hr />
      <div class="footer text-center">
        <p>Copyright © 2014-2015 <a href="http://apache.org">The Apache Software Foundation</a>. All Rights Reserved.</p>
        <p>Apache Flink, Apache, and the Apache feather logo are trademarks of The Apache Software Foundation.</p>
        <p><a href="/privacy-policy.html">Privacy Policy</a> &middot; <a href="/blog/feed.xml">RSS feed</a></p>
      </div>

    </div><!-- /.container -->

    <!-- jQuery (necessary for Bootstrap's JavaScript plugins) -->
    <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script>
    <!-- Include all compiled plugins (below), or include individual files as needed -->
    <script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
    <script src="/js/codetabs.js"></script>

    <!-- Google Analytics -->
    <script>
      (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
      (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
      m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
      })(window,document,'script','//www.google-analytics.com/analytics.js','ga');

      ga('create', 'UA-52545728-1', 'auto');
      ga('send', 'pageview');
    </script>
  </body>
</html>
